feat: metrics integration for commit and scan#701
Conversation
|
|
||
| namespace iceberg { | ||
|
|
||
| class MetricsReporter; |
There was a problem hiding this comment.
Should we add this forward declaration to iceberg/type_fwd.h?
|
|
||
| RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr<FileIO> file_io, | ||
| std::unique_ptr<HttpClient> client, | ||
| std::shared_ptr<HttpClient> client, |
There was a problem hiding this comment.
because MakeTableReporter() hands the same client to a per-table RestMetricsReporter that must keep posting metrics independently of the catalog's lifetime. alternatively we can make the table reporter own an httpClient but I feel like the burden of initialization could be high
There was a problem hiding this comment.
Pull request overview
Adds core metrics reporting to Iceberg C++ commit and scan workflows, including wiring metrics reporters through catalogs/tables and emitting REST-spec-compatible reports.
Changes:
- Emit
CommitReportfromTransaction::Commit()when a commit produces a new snapshot, and propagate table reporters into update operations. - Collect and emit
ScanReport(with scan planning counters/timers) fromDataTableScan::PlanFiles(), including new manifest/file skip counters. - Introduce
RestMetricsReporterand catalog-level wiring (REST/SQL/InMemory) plus new end-to-end/unit tests for scan planning and REST reporter behavior.
Reviewed changes
Copilot reviewed 28 out of 29 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| src/iceberg/update/snapshot_update.h | Adds per-snapshot-update reporter plumbing (ReportWith) and stores a reporter pointer. |
| src/iceberg/transaction.cc | Adds commit timing/attempt metrics and emits CommitReport after successful snapshot-producing commits; propagates reporter to FastAppend in transactions. |
| src/iceberg/test/scan_planning_metrics_test.cc | New end-to-end tests validating ScanReport emission and scan planning counters. |
| src/iceberg/test/rest_metrics_reporter_test.cc | New tests asserting REST reporter suppresses HTTP failures. |
| src/iceberg/test/fast_append_test.cc | Adds integration tests verifying CommitReport emission behavior for snapshot vs property-only commits. |
| src/iceberg/test/CMakeLists.txt | Registers the new test sources in the build. |
| src/iceberg/table.h | Extends Table::Make/StagedTable::Make to accept an optional reporter; adds reporter accessors and combining API. |
| src/iceberg/table.cc | Stores reporter on tables; wires reporter + table name into scans and propagates reporter to FastAppend. |
| src/iceberg/table_scan.h | Extends scan context and builder API to include table name + metrics reporter. |
| src/iceberg/table_scan.cc | Collects scan planning metrics, attaches them to manifest planning, and emits ScanReport. |
| src/iceberg/manifest/manifest_reader.h | Adds SkipCounter() hook to count per-entry skips during manifest reading. |
| src/iceberg/manifest/manifest_reader.cc | Implements skip counting for partition/metrics/partition-set filtering. |
| src/iceberg/manifest/manifest_reader_internal.h | Stores skip_counter_ in the reader implementation. |
| src/iceberg/manifest/manifest_group.h | Adds ability to attach scan metrics to manifest planning. |
| src/iceberg/manifest/manifest_group.cc | Increments scan counters for manifests/files and delete-file result accounting; wires reader skip counter. |
| src/iceberg/delete_file_index.h | Adds optional scan metrics hook (non-owning) for delete-manifest accounting. |
| src/iceberg/delete_file_index.cc | Counts scanned/skipped delete manifests and skipped delete files while building the delete index. |
| src/iceberg/catalog/sql/sql_catalog.h | Stores a catalog-level reporter for tables loaded/created by SQL catalog. |
| src/iceberg/catalog/sql/sql_catalog.cc | Loads configured reporter and passes it into created/loaded tables. |
| src/iceberg/catalog/rest/rest_metrics_reporter.h | Introduces REST metrics reporter interface for POSTing reports to the REST endpoint. |
| src/iceberg/catalog/rest/rest_metrics_reporter.cc | Implements JSON serialization + POST and suppresses failures (fire-and-forget). |
| src/iceberg/catalog/rest/rest_catalog.h | Switches REST catalog HTTP client ownership to shared_ptr; adds per-table reporter builder. |
| src/iceberg/catalog/rest/rest_catalog.cc | Loads configured reporter and optionally combines it with a per-table REST reporter when enabled/supported. |
| src/iceberg/catalog/rest/meson.build | Adds new REST metrics reporter source to Meson build. |
| src/iceberg/catalog/rest/CMakeLists.txt | Adds new REST metrics reporter source to CMake build. |
| src/iceberg/catalog/rest/catalog_properties.h | Adds rest-metrics-reporting-enabled property. |
| src/iceberg/catalog/memory/in_memory_catalog.h | Stores a catalog-level reporter for in-memory catalog tables. |
| src/iceberg/catalog/memory/in_memory_catalog.cc | Loads configured reporter and passes it into created/loaded tables. |
| .gitignore | Ignores additional build output directories. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| auto metrics_context = MetricsContext::Default(); | ||
| std::shared_ptr<ScanMetrics> scan_metrics = ScanMetrics::Make(*metrics_context); | ||
| auto timed = scan_metrics->total_planning_duration->Start(); |
| Status RestMetricsReporter::Report(const MetricsReport& report) { | ||
| // Serialize the report variant to JSON. | ||
| Result<nlohmann::json> json_result = std::visit( | ||
| [](const auto& r) -> Result<nlohmann::json> { return ToJson(r); }, report); | ||
| if (!json_result) { | ||
| return {}; | ||
| } | ||
|
|
||
| // Inject "report-type" required by the REST spec (not included in core ToJson). | ||
| auto& json = json_result.value(); | ||
| json[kReportType] = | ||
| std::holds_alternative<ScanReport>(report) ? kScanReportType : kCommitReportType; | ||
|
|
||
| // POST to the metrics endpoint; suppress errors to match Java fire-and-forget behavior. | ||
| std::ignore = client_->Post(metrics_endpoint_, json.dump(), /*headers=*/{}, | ||
| *DefaultErrorHandler::Instance(), *session_); | ||
| return {}; | ||
| } |
| CapturingReporter* g_capturing_reporter = nullptr; | ||
|
|
||
| void RegisterCapturingReporter() { | ||
| static std::once_flag flag; | ||
| std::call_once(flag, [] { | ||
| (void)MetricsReporters::Register( | ||
| "fast.append.test.reporter", | ||
| [](const auto&) -> Result<std::unique_ptr<MetricsReporter>> { | ||
| auto ptr = std::make_unique<CapturingReporter>(); | ||
| g_capturing_reporter = ptr.get(); | ||
| return ptr; | ||
| }); |
| ScanReport report{ | ||
| .table_name = context_.table_name, | ||
| .snapshot_id = snapshot->snapshot_id, | ||
| .filter = context_.filter, |
There was a problem hiding this comment.
This sends the raw scan filter to metrics reporters. Java sanitizes the filter before reporting, so literal predicate values are not exposed. With the current code, a scan like email = 'x' posts that value to the REST metrics endpoint or any custom reporter.
| /// \param reporter The metrics reporter to use. | ||
| /// \return Reference to this for method chaining. | ||
| auto& ReportWith(this auto& self, std::shared_ptr<MetricsReporter> reporter) { | ||
| static_cast<SnapshotUpdate&>(self).reporter_ = std::move(reporter); |
There was a problem hiding this comment.
This stores the reporter on SnapshotUpdate, but nothing reads reporter_ when building the CommitReport; Transaction::Commit uses ctx_->table->reporter() instead. A caller that does fast_append->ReportWith(custom_reporter) will not receive a commit report unless the table already has that reporter. Java's SnapshotProducer.reportWith overrides the operation reporter.
| const auto& schema_ptr = projected_schema.get(); | ||
| std::vector<int32_t> projected_field_ids; | ||
| std::vector<std::string> projected_field_names; | ||
| for (const auto& field : schema_ptr->fields()) { |
There was a problem hiding this comment.
This only reports top-level projected fields. For nested projections, Java uses TypeUtil.getProjectedIds(schema()) and includes nested child IDs/names. The C++ scan already uses GetProjectedIdsVisitor when resolving selected columns; the report should use the same recursive IDs plus FindColumnNameById.
| if (scan_metrics_) { | ||
| for (const auto& task : file_tasks) { | ||
| for (const auto& df : task->delete_files()) { | ||
| scan_metrics_->result_delete_files->Increment(1); |
There was a problem hiding this comment.
These counters are updated per FileScanTask, so the same delete file is counted once for every data file it applies to. Java increments indexed/type delete counters once while building DeleteFileIndex. A global or partition delete file that matches N data files will inflate indexed_delete_files and the delete type counters by N here.
| ContentFileUtil::DropUnselectedStats(*entry.data_file, columns); | ||
| files.emplace_back(std::move(entry)); | ||
| } else { | ||
| if (scan_metrics_) scan_metrics_->skipped_delete_files->Increment(1); |
There was a problem hiding this comment.
This counts delete files older than min_sequence_number_ as skipped_delete_files. Java drops those files in the same sequence-number filter without counting them as skipped. Snapshots with old delete files will report extra skipped delete files.
| auto it = props.find(std::string(kMetricsReporterImpl)); | ||
| if (it != props.end() && !it->second.empty() && | ||
| it->second != kMetricsReporterTypeNoop) { | ||
| if (auto r = MetricsReporters::Load(props); r.has_value()) { |
There was a problem hiding this comment.
MetricsReporters::Load errors are silently ignored. If metrics-reporter-impl is misspelled or the factory fails, the REST catalog still initializes with no custom reporter and no diagnostic. Java fails catalog initialization for an invalid metrics reporter; this should propagate the load error.
| auto it = properties_.find(std::string(kMetricsReporterImpl)); | ||
| if (it != properties_.end() && !it->second.empty() && | ||
| it->second != kMetricsReporterTypeNoop) { | ||
| if (auto r = MetricsReporters::Load(properties_); r.has_value()) { |
There was a problem hiding this comment.
Same issue here: MetricsReporters::Load errors are ignored, so an invalid metrics-reporter-impl silently disables reporting for the in-memory catalog. This should propagate the load failure.
| auto it = config_.props.find(std::string(kMetricsReporterImpl)); | ||
| if (it != config_.props.end() && !it->second.empty() && | ||
| it->second != kMetricsReporterTypeNoop) { | ||
| if (auto r = MetricsReporters::Load(config_.props); r.has_value()) { |
There was a problem hiding this comment.
Same issue here: MetricsReporters::Load errors are ignored, so an invalid metrics-reporter-impl silently disables reporting for the SQL catalog. This should propagate the load failure.
| Result<std::unique_ptr<DataTableScanBuilder>> Table::NewScan() const { | ||
| return DataTableScanBuilder::Make(metadata_, io_); | ||
| ICEBERG_ASSIGN_OR_RAISE(auto builder, DataTableScanBuilder::Make(metadata_, io_)); | ||
| builder->TableName(identifier_.ToString()); |
There was a problem hiding this comment.
This uses the table identifier, not the fully qualified table name. Java reports BaseTable.name(), and REST builds that as catalog.namespace.table. Scan reports from two catalogs with the same identifier will collide and differ from Java REST payloads.
| const auto& snapshot = snapshot_result.value(); | ||
| const auto op = snapshot->Operation(); | ||
| CommitReport report{ | ||
| .table_name = ctx_->table->name().ToString(), |
There was a problem hiding this comment.
Same table-name issue for commit reports: this uses TableIdentifier::ToString(), so the catalog name is missing. Java commit reports use the table's full name (catalog.namespace.table), which avoids collisions across catalogs.
This change implements part 2 of the core metrics. it covers RestCatalog spec, integration with commit and scan workflows. It leaves the logging metrics reporter out of scope, it will be added in a subsequent diff.
some other differences is that some of the http calls for the RestCatalogMetrics reporter is still synchronous a subsequent change will cover that.